[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset#16391
[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset#16391cloud-fan wants to merge 4 commits intoapache:masterfrom
Conversation
| * In order to do type checking, use Literal.create() instead of constructor | ||
| */ | ||
| case class Literal (value: Any, dataType: DataType) extends LeafExpression with CodegenFallback { | ||
| case class Literal (value: Any, dataType: DataType) extends LeafExpression { |
There was a problem hiding this comment.
these are unrelated changes. When I looked at the generated codes, I found Literal generated verbose codes, so I simplified it a little bit.
There was a problem hiding this comment.
Is it better to split into two PRs? To put two changes into one PR may not be easy to understand diffs of performance results.
There was a problem hiding this comment.
ok I reverted. The result is almost the same so I didn't update
| val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) | ||
| benchmark.addCase("RDD sum") { iter => | ||
| rdd.aggregate(0L)(_ + _.l, _ + _) | ||
| rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit) |
There was a problem hiding this comment.
also test the grouping performance, not only aggregating.
There was a problem hiding this comment.
Is there any reason to add grouping operation?
There was a problem hiding this comment.
aggregate without grouping is not a common use case
There was a problem hiding this comment.
Actually, I think we should also have a test case for aggregation without group by.
| Dataset 4781 / 5155 20.9 47.8 0.7X | ||
| RDD 3963 / 3976 25.2 39.6 1.0X | ||
| DataFrame 826 / 834 121.1 8.3 4.8X | ||
| Dataset 5178 / 5198 19.3 51.8 0.8X |
There was a problem hiding this comment.
for "back-to-back map", the logic is so simple that the code generated by Dataset is less efficient than RDD. RDD just adds 1 to the input Long, the only overhead is boxing, while Dataset generates code like this:
boolean mapelements_isNull = true;
long mapelements_value = -1L;
if (!false) {
mapelements_argValue = range_value;
mapelements_isNull = false;
if (!mapelements_isNull) {
Object mapelements_funcResult = null;
mapelements_funcResult = mapelements_obj.apply(mapelements_argValue);
if (mapelements_funcResult == null) {
mapelements_isNull = true;
} else {
mapelements_value = (Long) mapelements_funcResult;
}
}
}
Dataset still has the boxing overhead, but its code is more verbose. And Dataset has to write the long to un unsafe row at last, which is another overhead. These are the reasons why Dataset is slower than RDD for this simple case.
There was a problem hiding this comment.
IIUC, an signature of apply() is Object apply(Object). It also introduces additional boxing overhead from long to Long.
To reduce these boxing and unboxing overhead, we need to use more concrete signature (e.g. long apply(long) to call a lambda function.
There was a problem hiding this comment.
the method signature in Dataset is: def map[U : Encoder](f: T => U), unless we create primitive version methods, e.g. def map(f: T => Long), I can't think of an easy way to get the concrete signature.
BTW, I think the best solution is to analyze the byte code(class file) of the lambda function, and turn it into expressions.
There was a problem hiding this comment.
I noticed that Scala compiler automatically generates primitive version. Current Spark eventually calls primitive version thru generic version Object apply(Object).
Here is a simple example. When we compile the following Dataset program, we can find that the following class is generated by scalac. Scalac automatically generates a primitive version int apply$mcII$sp(int) that can be called by int apply(int).
We could infer this signature in Catalyst for simple cases.
Of course, I totally agree that the best solution is to analyze byte code and turn it into expression. This was already prototyped. Do you think it is good time to make this prototype more robust now?
test("ds") {
val ds = sparkContext.parallelize((1 to 10), 1).toDS
ds.map(i => i * 7).show
}
$ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
Compiled from "Test.scala"
public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
public static final long serialVersionUID;
public final int apply(int);
Code:
0: aload_0
1: iload_1
2: invokevirtual #18 // Method apply$mcII$sp:(I)I
5: ireturn
public int apply$mcII$sp(int);
Code:
0: iload_1
1: bipush 7
3: imul
4: ireturn
public final java.lang.Object apply(java.lang.Object);
Code:
0: aload_0
1: aload_1
2: invokestatic #29 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
5: invokevirtual #31 // Method apply:(I)I
8: invokestatic #35 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
11: areturn
public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
Code:
0: aload_0
1: invokespecial #42 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
4: return
}There was a problem hiding this comment.
ah, scala compiler is smart! I think we can create a ticket to optimize this, i.e. call the primitive apply version, and update the benchmark result.
For byte code analysis, let's discuss about it in the ticket later.
There was a problem hiding this comment.
Sure, I will create a JIRA ticket for this optimization.
For byte code analysis, let's restart discuss about it the JIRA entry.
| Dataset 2777 / 2805 36.0 27.8 0.5X | ||
| RDD 533 / 587 187.6 5.3 1.0X | ||
| DataFrame 79 / 91 1269.0 0.8 6.8X | ||
| Dataset 550 / 559 181.7 5.5 1.0X |
There was a problem hiding this comment.
For "back-to-back filter", Dataset will deserialize the input row to an object and apply the condition function. When the deserialization becomes no-op, Dataset runs almost the same RDD code like the RDD case. So in this case, RDD and Dataset has similar performance.
| RDD sum 1950 / 1995 51.3 19.5 1.0X | ||
| DataFrame sum 587 / 611 170.2 5.9 3.3X | ||
| Dataset sum using Aggregator 3014 / 3222 33.2 30.1 0.6X | ||
| Dataset complex Aggregator 32650 / 34505 3.1 326.5 0.1X |
There was a problem hiding this comment.
For "aggregate", Dataset use AppendColumnsExec to generate the grouping key, which will do an extra copy(the unsafe row joiner). This makes Dataset slower than RDD.
| val ds = spark.range(0, numRows) | ||
| val df = ds.toDF("l") | ||
|
|
||
| val benchmark = new Benchmark("aggregate", numRows) |
There was a problem hiding this comment.
It would be good to update aggregate.
|
Test build #70552 has finished for PR 16391 at commit
|
|
Test build #70553 has finished for PR 16391 at commit
|
|
Test build #70619 has finished for PR 16391 at commit
|
|
thanks for the review, merging to master! |
|
Seems there is no explicit LGTM. I am reverting this change from master. |
|
Just reverted from master. btw, I think we can keep existing cases and then add new cases. |
|
I agree that it is quite nice to have multiple record types in the benchmark to reveal the source of overheads! |
## What changes were proposed in this pull request? Currently `DatasetBenchmark` use `case class Data(l: Long, s: String)` as the record type of `RDD` and `Dataset`, which introduce serialization overhead only to `Dataset` and is unfair. This PR use `Long` as the record type, to be fairer for `Dataset` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16391 from cloud-fan/benchmark.
## What changes were proposed in this pull request? Currently `DatasetBenchmark` use `case class Data(l: Long, s: String)` as the record type of `RDD` and `Dataset`, which introduce serialization overhead only to `Dataset` and is unfair. This PR use `Long` as the record type, to be fairer for `Dataset` ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes apache#16391 from cloud-fan/benchmark.
What changes were proposed in this pull request?
Currently
DatasetBenchmarkusecase class Data(l: Long, s: String)as the record type ofRDDandDataset, which introduce serialization overhead only toDatasetand is unfair.This PR use
Longas the record type, to be fairer forDatasetHow was this patch tested?
existing tests